package com.xj.socket.tcp.nio.server;

import org.apache.log4j.Logger;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

/**
 * ReadWriteSelector 封装了selector，但其只监听读写事件
 * @author xiajun
 *
 */
public class ReadWriteSelector implements Runnable {
	private final static Logger log=Logger.getLogger(ReadWriteSelector.class);
	private static int nameNum = 0;//selector名称编号
	private Selector selector = null;
	private volatile boolean isRunning = false;
	private String selectorName = null;//selector的自定义的名称
	/**
	 * 处理selector.select()等方法阻塞时协调的锁
	 */
	public final Object lock=new Object();

	public ReadWriteSelector() throws IOException {
		this("readWriteSelector_" + nameNum);
	}
	/**
	 * 构造一个读写选择器
	 * @param selectorName 选择器的名称
	 * @throws java.io.IOException
	 */
	public ReadWriteSelector(String selectorName) throws IOException {
		this.selector = Selector.open();
		this.selectorName = selectorName;
		this.setNum();
	}
	/**
	 * 线程运行函数，循环监听读写事件
	 */
	public void run() {
		try {
			while (true) {
				synchronized(lock){
				}
				log.debug("---------------");
				int i=selector.select();
				if (i<=0)
				{
					log.debug("select 小于0 "+i);
					continue;
				}
				Set<SelectionKey> selectedKeys = this.selector.selectedKeys();
				this.accept(selectedKeys);
			}
		} catch (IOException e) {
			log.error("",e);
		} finally {
			log.debug("选择器"+this.selectorName+" 停止工作。");
			this.setIsRunning(false);
		}
	}
	/**
	 * 处理读写事件
	 * @param selectedKeys selector key的集合
	 * @throws java.io.IOException
	 */
	private void accept(Set<SelectionKey> selectedKeys) throws IOException {
		Iterator<SelectionKey> it = selectedKeys.iterator();
		while (it.hasNext()) {
			log.debug("------------");
			SelectionKey key = it.next();
			it.remove();
			if (key.isReadable()) {
				SocketChannel sc = (SocketChannel) key.channel();
				ByteBuffer readBuffer = ByteBuffer.allocate(1024);
				byte[] dst = null;
				int size = 0;
				try {
					while ((size = sc.read(readBuffer)) > 0) {
						readBuffer.flip();
						dst = new byte[size];
						readBuffer.get(dst);
						readBuffer.clear();
					}
					if(size==-1){
						throw new IOException("客户端"+sc.socket().getLocalAddress()+"已经关闭");
					}
				} catch (IOException e) {
					log.warn(sc.socket()+"异常关闭...",e);
					sc.close();
				}
				if (dst != null) {
					System.out.println(new String(dst, "utf-8"));
					key.interestOps(SelectionKey.OP_WRITE|SelectionKey.OP_READ);
				}
			} else if (key.isWritable()) {
				SocketChannel sc = (SocketChannel) key.channel();
				String info = "收到您的信息";
				byte[] w_byte = info.getBytes("utf-8");
				ByteBuffer writeBuffer = ByteBuffer.allocate(w_byte.length);
				writeBuffer.put(w_byte);
				writeBuffer.flip();
				sc.write(writeBuffer);
				key.interestOps(SelectionKey.OP_READ);
				System.out.println("发送消息成功...");
			}
		}
	}
	/**
	 * 获取封装的selector
	 * @return Selector
	 */
	public Selector getSelector() {
		return selector;
	}
	/**
	 * 创建Selector的个数加1
	 */
	private synchronized void setNum() {
		nameNum++;
	}
	/**
	 * 返回Selector的名称
	 * @return String
	 */
	public String getSelectorName() {
		return selectorName;
	}
	/**
	 * 返回此Selector是否已经开始工作
	 * @return boolean
	 */
	public boolean isRunning() {
		return isRunning;
	}
	/**
	 * 设置Selector为正在工作状态
	 * @param b true为开始工作 false 为停止工作
	 */
	public void setIsRunning(boolean b) {
		isRunning = b;
	}
}
